Integrating save/migrate handling with xfrd.
Add suspend callback to save.
#ifndef __XC_XC_IO_H__
#define __XC_XC_IO_H__
+#include <errno.h>
#include "xc_private.h"
#include "iostream.h"
IOStream *err;
char *vmconfig;
int vmconfig_n;
+ int (*suspend)(u32 domain, void *data);
+ void *data;
} XcIOContext;
+static inline int xcio_suspend_domain(XcIOContext *ctxt){
+ int err = 0;
+
+ if(ctxt->suspend){
+ err = ctxt->suspend(ctxt->domain, ctxt->data);
+ } else {
+ err = -EINVAL;
+ }
+ return err;
+}
+
static inline int xcio_read(XcIOContext *ctxt, void *buf, int n){
int rc;
def xend_domain(self, id):
return xend_get(self.domainurl(id))
+ def xend_domain_configure(self, id, config):
+ return xend_call(self.domainurl(id),
+ {'op' : 'configure',
+ 'config' : fileof(conf) })
+
def xend_domain_unpause(self, id):
return xend_call(self.domainurl(id),
{'op' : 'unpause'})
return dominfo
deferred.addCallback(fn)
return deferred
+
+ def domain_configure(self, id, config):
+ """Configure an existing domain. This is intended for internal
+ use by domain restore and migrate.
+
+ @param id: domain id
+ @param config: configuration
+ @return: deferred
+ """
+ dom = int(id)
+ dominfo = self.domain_get(dom)
+ if not dominfo:
+ raise ValueError("Invalid domain: " + str(id))
+ if dominfo.config:
+ raise ValueError("Domain already configured: " + str(id))
+ def fn(dominfo):
+ self._add_domain(dominfo.id, dominfo)
+ return dominfo
+ deferred = dominfo.construct(config)
+ deferred.addCallback(fn)
+ return deferred
+
+ def domain_restore(self, src, progress=0):
+ """Restore a domain from file.
+
+ @param src: source file
+ @param progress: output progress if true
+ @return: deferred
+ """
+
+ def fn(dominfo):
+ self._add_domain(dominfo.id, dominfo)
+ return dominfo
+ deferred = XendDomainInfo.vm_restore(src, progress=progress)
+ deferred.addCallback(fn)
+ return deferred
def domain_get(self, id):
"""Get up-to-date info about a domain.
- reboot: domain will restart.
- halt: domain will not restart (even if has autorestart set).
+ Returns immediately.
+
@param id: domain id
@param reason: shutdown type: poweroff, reboot, suspend, halt
"""
def domain_destroy(self, id):
"""Terminate domain immediately.
- Camcels any restart for the domain.
+ Cancels any restart for the domain.
@param id: domain id
"""
"""Start domain migration.
@param id: domain id
+ @return: deferred
"""
# Need a cancel too?
# Don't forget to cancel restart for it.
return xmigrate.migrate_begin(dom, dst)
def domain_save(self, id, dst, progress=0):
- """Save domain state to file, destroy domain on success.
- Leave domain running on error.
+ """Start saving a domain to file.
@param id: domain id
@param dst: destination file
@param progress: output progress if true
+ @return: deferred
"""
dom = int(id)
- dominfo = self.domain_get(id)
- if not dominfo:
- return -1
- vmconfig = sxp.to_string(dominfo.sxpr())
- self.domain_pause(id)
- eserver.inject('xend.domain.save', id)
- try:
- rc = xc.linux_save(dom=dom, state_file=dst,
- vmconfig=vmconfig, progress=progress)
- except:
- rc = -1
- if rc == 0:
- self.domain_destroy(id)
- else:
- self.domain_unpause(id)
- return rc
-
- def domain_restore(self, src, progress=0):
- """Restore domain from file.
-
- @param src : source file
- @param progress: output progress if true
- @return: domain object
- """
- dominfo = XendDomainInfo.vm_restore(src, progress=progress)
- self._add_domain(dominfo.id, dominfo)
- return dominfo
+ xmigrate = XendMigrate.instance()
+ return xmigrate.save_begin(dom, dst)
def domain_pincpu(self, dom, cpu):
"""Pin a domain to a cpu.
# Copyright (C) 2004 Mike Wray <mike.wray@hp.com>
+import errno
import sys
import socket
import time
import XendDB
import EventServer; eserver = EventServer.instance()
+"""The port for the migrate/save daemon xfrd."""
XFRD_PORT = 8002
+"""The transfer protocol major version number."""
XFR_PROTO_MAJOR = 1
+"""The transfer protocol minor version number."""
XFR_PROTO_MINOR = 0
class Xfrd(Protocol):
"""Factory for clients of the migration/save daemon xfrd.
"""
- def __init__(self, minfo):
+ def __init__(self, xinfo):
#ClientFactory.__init__(self)
- self.minfo = minfo
+ self.xinfo = xinfo
def startedConnecting(self, connector):
print 'Started to connect', 'self=', self, 'connector=', connector
def buildProtocol(self, addr):
print 'buildProtocol>', addr
- return Migrate(self.minfo)
+ return Xfrd(self.xinfo)
def clientConnectionLost(self, connector, reason):
print 'clientConnectionLost>', 'connector=', connector, 'reason=', reason
"""Abstract class for info about a session with xfrd.
Has subclasses for save and migrate.
"""
-
+
+ def __init__(self):
+ from xen.xend import XendDomain
+ self.xd = XendDomain.instance()
+ self.deferred = defer.Deferred()
+
def vmconfig(self):
print 'vmconfig>'
- from xen.xend import XendDomain
- xd = XendDomain.instance()
-
- dominfo = xd.domain_get(self.src_dom)
+ dominfo = self.xd.domain_get(self.src_dom)
print 'vmconfig>', type(dominfo), dominfo
if dominfo:
val = sxp.to_string(dominfo.sxpr())
def error(self, err):
self.state = 'error'
+ if not self.deferred.called:
+ self.deferred.errback(err)
def dispatch(self, xfrd, val):
op = sxp.name(val)
if op.startswith('xfr_'):
fn = getattr(self, op, self.unknown)
else:
- fn = self.unknown()
- fn(xfrd, val)
+ fn = self.unknown
+ val = fn(xfrd, val)
+ if val is not None:
+ sxp.show(val, out=self.transport)
def unknown(self, xfrd, val):
print 'unknown>', val
+ xfrd.loseConnection()
+ return None
def xfr_err(self, xfrd, val):
- # If we get an error with non-zero code the migrate failed.
+ # If we get an error with non-zero code the operation failed.
# An error with code zero indicates hello success.
print 'xfr_err>', val
v = sxp.child(val)
if not err: return
self.error(err);
xfrd.loseConnection()
+ #try:
+ # self.xd.domain_unpause(self.src_dom)
+ #except:
+ # print >>sys.stdout, "Error unpausing domain:", self.src_dom
+ return None
def xfr_progress(self, val):
print 'xfr_progress>', val
-
- def xfr_domain_pause(self, val):
- print 'xfr__domain_pause>', val
-
- def xfr_domain_suspend(self, val):
- print 'xfr_domain_suspend>', val
+ return None
+
+ def xfr_vm_pause(self, val):
+ print 'xfr_vm_pause>', val
+ try:
+ vmid = sxp.child0(val)
+ val = self.xd.domain_pause(vmid)
+ except:
+ val = errno.EINVAL
+ return ['xfr.err', val]
+
+ def xfr_vm_unpause(self, val):
+ print 'xfr_vm_unpause>', val
+ try:
+ vmid = sxp.child0(val)
+ val = self.xd.domain_unpause(vmid)
+ except:
+ val = errno.EINVAL
+ return ['xfr.err', val]
+
+ def xfr_vm_suspend(self, val):
+ print 'xfr_vm_suspend>', val
+ try:
+ vmid = sxp.child0(val)
+ val = self.xd.domain_shutdown(vmid, reason='suspend')
+ except:
+ val = errno.EINVAL
+ return ['xfr.err', val]
class XendMigrateInfo(XfrdInfo):
"""Representation of a migrate in-progress and its interaction with xfrd.
"""
def __init__(self, id, dom, host, port):
+ XfrdInfo.__init__(self)
self.id = id
self.state = 'begin'
self.src_host = socket.gethostname()
self.dst_port = port
self.dst_dom = None
self.start = 0
- self.deferred = defer.Deferred()
def sxpr(self):
sxpr = ['migrate', ['id', self.id], ['state', self.state] ]
self.src_dom,
vmconfig,
self.dst_host,
- self.d.dst_port])
+ self.dst_port])
def xfr_migrate_ok(self, val):
dom = int(sxp.child0(val))
self.state = 'ok'
self.dst_dom = dom
+ self.xd_domain_destroy(self.src_dom)
+ if not self.deferred.called:
+ self.deferred.callback(self)
def connectionLost(self, reason=None):
if self.state =='ok':
"""
def __init__(self, id, dom, file):
+ XfrdInfo.__init__(self)
self.id = id
self.state = 'begin'
self.src_dom = dom
self.file = file
self.start = 0
- self.deferred = defer.Deferred()
def sxpr(self):
sxpr = ['save',
def xfr_save_ok(self, val):
dom = int(sxp.child0(val))
self.state = 'ok'
+ self.xd_domain_destroy(self.src_dom)
+ if not self.deferred.called:
+ self.deferred.callback(self)
def connectionLost(self, reason=None):
if self.state =='ok':
"""External api for interaction with xfrd for migrate and save.
Singleton.
"""
- # Represents migration in progress.
# Use log for indications of begin/end/errors?
# Need logging of: domain create/halt, migrate begin/end/fail
# Log via event server?
def __init__(self):
self.db = XendDB.XendDB(self.dbpath)
- self.migrate = {}
- self.migrate_db = self.db.fetchall("")
+ self.session = {}
+ self.session_db = self.db.fetchall("")
self.id = 0
def nextid(self):
return "%d" % self.id
def sync(self):
- self.db.saveall("", self.migrate_db)
+ self.db.saveall("", self.session_db)
- def sync_migrate(self, id):
- self.db.save(id, self.migrate_db[id])
+ def sync_session(self, id):
+ self.db.save(id, self.session_db[id])
def close(self):
pass
- def _add_migrate(self, id, info):
- self.migrate[id] = info
- self.migrate_db[id] = info.sxpr()
- self.sync_migrate(id)
+ def _add_session(self, id, info):
+ self.session[id] = info
+ self.session_db[id] = info.sxpr()
+ self.sync_session(id)
#eserver.inject('xend.migrate.begin', info.sxpr())
- def _delete_migrate(self, id):
+ def _delete_session(self, id):
#eserver.inject('xend.migrate.end', id)
- del self.migrate[id]
- del self.migrate_db[id]
+ del self.session[id]
+ del self.session_db[id]
self.db.delete(id)
- def migrate_ls(self):
- return self.migrate.keys()
+ def session_ls(self):
+ return self.session.keys()
- def migrates(self):
- return self.migrate.values()
+ def sessions(self):
+ return self.session.values()
- def migrate_get(self, id):
- return self.migrate.get(id)
+ def session_get(self, id):
+ return self.session.get(id)
+
+ def session_begin(self, info):
+ self._add_session(id, info)
+ mcf = XfrdClientFactory(info)
+ reactor.connectTCP('localhost', XFRD_PORT, mcf)
+ return info
def migrate_begin(self, dom, host, port=XFRD_PORT):
+ """Begin to migrate a domain to another host.
+
+ @param dom: domain
+ @param host: destination host
+ @param port: destination port
+ @return: deferred
+ """
# Check dom for existence, not migrating already.
# Subscribe to migrate notifications (for updating).
id = self.nextid()
info = XendMigrateInfo(id, dom, host, port)
- self._add_migrate(id, info)
- mcf = XfrdClientFactory(info)
- reactor.connectTCP('localhost', XFRD_PORT, mcf)
- return info
+ self.session_begin(info)
+ return info.deferred
+
+ def save_begin(self, dom, file):
+ """Begin saving a domain to file.
+
+ @param dom: domain
+ @param file: destination file
+ @return: deferred
+ """
+ id = self.nextid()
+ info = XendSaveInfo(id, dom, file)
+ self.session_begin(info)
+ return info.deferred
def instance():
global inst
self.xd = XendDomain.instance()
self.xconsole = XendConsole.instance()
+ def op_configure(self, op, req):
+ fn = FormFn(self.xd.domain_configure,
+ [['dom', 'int'],
+ ['config', 'sxp']])
+ val = fn(req.args, {'dom': self.dom.id})
+ #todo: may need to add ok and err callbacks.
+ return val
+
def op_unpause(self, op, req):
val = self.xd.domain_unpause(self.dom.id)
return val
return val
def op_shutdown(self, op, req):
- #val = self.xd.domain_shutdown(self.dom.id)
fn = FormFn(self.xd.domain_shutdown,
[['dom', 'int'],
['reason', 'str']])
req.write('<form method="post" action="%s">' % req.prePathURL())
req.write('<input type="submit" name="op" value="unpause">')
req.write('<input type="submit" name="op" value="pause">')
- req.write('<input type="submit" name="op" value="shutdown">')
req.write('<input type="submit" name="op" value="destroy">')
+ req.write('</form>')
+
+ req.write('<form method="post" action="%s">' % req.prePathURL())
+ req.write('<input type="submit" name="op" value="shutdown">')
+ req.write('<input type="radio" name="reason" value="poweroff" checked>Poweroff<br>')
+ req.write('<input type="radio" name="reason" value="halt">Halt<br>')
+ req.write('<input type="radio" name="reason" value="reboot">Reboot<br>')
+ req.write('</form>')
+
+ req.write('<form method="post" action="%s">' % req.prePathURL())
+ req.write('<br><input type="submit" name="op" value="save">')
+ req.write('To file: <input type="text" name="file">')
+ req.write('</form>')
+
+ req.write('<form method="post" action="%s">' % req.prePathURL())
req.write('<br><input type="submit" name="op" value="migrate">')
- req.write('To: <input type="text" name="destination">')
+ req.write('To host: <input type="text" name="destination">')
req.write('</form>')
def op_restore(self, op, req):
"""Restore a domain from file.
+
+ @return: deferred
"""
+ #todo: return is deferred. May need ok and err callbacks.
fn = FormFn(self.xd.domain_restore,
[['file', 'str']])
val = fn(req.args)
req.write('<button type="submit" name="op" value="create">Create Domain</button>')
req.write('Config <input type="file" name="config"><br>')
req.write('</form>')
+
req.write('<form method="post" action="%s" enctype="multipart/form-data">'
% req.prePathURL())
- req.write('<button type="submit" name="op" value="create">Restore Domain</button>')
+ req.write('<button type="submit" name="op" value="restore">Restore Domain</button>')
req.write('State <input type="string" name="state"><br>')
req.write('</form>')
vpath %.h $(XEN_XU)
INCLUDES += -I $(XEN_XU)
+vpath %.h $(XEN_LIBXC)
+INCLUDES += -I $(XEN_LIBXC)
+
vpath %c $(XEN_LIBXUTIL)
INCLUDES += -I $(XEN_LIBXUTIL)
XFRD_PROG_OBJ = $(XFRD_PROG_SRC:.c=.o)
XFRD_PROG_OBJ += $(UTIL_LIB)
-CPPFLAGS += -D _XEN_XFR_STUB_
+# Flag controlling whether to use stubs.
+# Define to use stubs, undefine to use the real Xen functions.
+#CPPFLAGS += -D _XEN_XFR_STUB_
CFLAGS += -g
CFLAGS += -Wall
CFLAGS += -Wp,-MD,.$(@F).d
PROG_DEP = .*.d
-#LDFLAGS += -L $(COMPRESS_DIR) -lz
+#$(warning XFRD_PROG_OBJ= $(XFRD_PROG_OBJ))
+#$(warning UTIL_LIB= $(UTIL_LIB))
+#$(warning UTIL_LIB_OBJ= $(UTIL_LIB_OBJ))
+
+# Libraries for xfrd.
+XFRD_LIBS :=
+
+XFRD_LIBS += -L $(XEN_LIBXC) -lxc
+XFRD_LIBS += -L $(XEN_LIBXUTIL) -lxutil
+
+# zlib library.
+XFRD_LIBS += -lz
+
+CURL_FLAGS = $(shell curl-config --cflags)
+CURL_LIBS = $(shell curl-config --libs)
+CFLAGS += $(CURL_FLAGS)
+# libcurl libraries.
+XFRD_LIBS += $(CURL_LIBS)
-$(warning XFRD_PROG_OBJ= $(XFRD_PROG_OBJ))
-$(warning UTIL_LIB= $(UTIL_LIB))
-$(warning UTIL_LIB_OBJ= $(UTIL_LIB_OBJ))
+#$(warning XFRD_LIBS = $(XFRD_LIBS))
all: xfrd
-xfrd: $(XFRD_PROG_OBJ) -lz
+xfrd: $(XFRD_PROG_OBJ)
+ $(CC) -o $@ $^ $(XFRD_LIBS)
.PHONY: install
install: xfrd
$(RM) *.o *.a *.so *~ xfrd
$(RM) $(PROG_DEP)
+$(XFRD_PROG_OBJ): Makefile
-include $(PROG_DEP)
#include <stdio.h>
#ifndef _XEN_XFR_STUB_
-#include "dom0_defs.h"
-#include "mem_defs.h"
+#include "xc.h"
+#include "xc_io.h"
#endif
#include "xen_domain.h"
#include "marshal.h"
#include "xdr.h"
+#include "xfrd.h"
#define MODULE_NAME "XFRD"
#define DEBUG 1
#include "debug.h"
+#ifndef _XEN_XFR_STUB_
+static int domain_suspend(u32 dom, void *data){
+ Conn *xend = data;
+
+ return xfr_vm_suspend(xend, dom);
+}
+
+static int xc_handle = 0;
+
+int xcinit(void){
+ if(xc_handle <= 0){
+ xc_handle = xc_interface_open();
+ }
+ return xc_handle;
+}
+
+void xcfini(void){
+ if(xc_handle > 0){
+ xc_interface_close(xc_handle);
+ xc_handle = 0;
+ }
+}
+#endif
+
/** Write domain state.
*
* At some point during this the domain is suspended, and then there's no way back.
*/
int xen_domain_snd(Conn *xend, IOStream *io, uint32_t dom, char *vmconfig, int vmconfig_n){
int err = 0;
+#ifdef _XEN_XFR_STUB_
char buf[1024];
int n, k, d, buf_n;
dprintf("> dom=%d\n", dom);
-#ifdef _XEN_XFR_STUB_
err = marshal_uint32(io, dom);
if(err) goto exit;
err = marshal_string(io, vmconfig, vmconfig_n);
exit:
#else
+ XcIOContext _ioctxt = {}, *ioctxt = &_ioctxt;
+ dprintf("> dom=%d\n", dom);
+ ioctxt->io = io;
+ ioctxt->info = iostdout;
+ ioctxt->err = iostderr;
+ ioctxt->data = xend;
+ ioctxt->suspend = domain_suspend;
+
+ err = xc_linux_save(xcinit(), ioctxt);
#endif
dprintf("< err=%d\n", err);
return err;
*/
int xen_domain_rcv(IOStream *io, uint32_t *dom, char **vmconfig, int *vmconfig_n){
int err = 0;
+#ifdef _XEN_XFR_STUB_
char buf[1024];
int n, k, d, buf_n;
dprintf(">\n");
-#ifdef _XEN_XFR_STUB_
err = unmarshal_uint32(io, dom);
if(err) goto exit;
err = unmarshal_new_string(io, vmconfig, vmconfig_n);
}
exit:
#else
+ XcIOContext _ioctxt = {}, *ioctxt = &_ioctxt;
+ dprintf(">\n");
+ ioctxt->io = io;
+ ioctxt->info = iostdout;
+ ioctxt->err = iostderr;
+
+ err = xc_linux_restore(xcinit(), ioctxt);
#endif
dprintf("< err=%d\n", err);
return err;
}
-/** Configure a new domain. Talk to xend. Use libcurl?
+#include <curl/curl.h>
+
+static int do_curl_global_init = 1;
+
+static CURL *curlinit(void){
+ if(do_curl_global_init){
+ do_curl_global_init = 0;
+ curl_global_init(CURL_GLOBAL_ALL);
+ }
+ return curl_easy_init();
+}
+
+/** Configure a new domain. Talk to xend using libcurl.
*/
int xen_domain_configure(uint32_t dom, char *vmconfig, int vmconfig_n){
int err = 0;
- dprintf(">\n");
+ CURL *curl = NULL;
+ CURLcode curlcode = 0;
+ char domainurl[128] = {};
+ int domainurl_n = sizeof(domainurl) - 1;
+ int n;
+ struct curl_httppost *form = NULL, *last = NULL;
+ CURLFORMcode formcode = 0;
+
+ dprintf("> dom=%u\n", dom);
+ curl = curlinit();
+ if(!curl){
+ eprintf("> Could not init libcurl\n");
+ err = -ENOMEM;
+ goto exit;
+ }
+ n = snprintf(domainurl, domainurl_n,
+ "http://localhost:%d/xend/domain/%u", XEND_PORT, dom);
+ if(n <= 0 || n >= domainurl_n){
+ err = -ENOMEM;
+ eprintf("Out of memory in url.\n");
+ goto exit;
+ }
+ // Config field - set from vmconfig.
+ formcode = curl_formadd(&form, &last,
+ CURLFORM_COPYNAME, "config",
+ CURLFORM_BUFFER, "config",
+ CURLFORM_BUFFERPTR, vmconfig,
+ CURLFORM_BUFFERLENGTH, vmconfig_n,
+ CURLFORM_CONTENTTYPE, "application/octet-stream",
+ CURLFORM_END);
+ if(formcode){
+ eprintf("> Error adding config field.\n");
+ goto exit;
+ }
+ // Op field.
+ formcode = curl_formadd(&form, &last,
+ CURLFORM_COPYNAME, "op",
+ CURLFORM_COPYCONTENTS, "configure",
+ CURLFORM_END);
+
+ if(formcode){
+ eprintf("> Error adding op field.\n");
+ goto exit;
+ }
+ // No progress meter.
+ //curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 1);
+ // Completely quiet.
+ //curl_easy_setopt(curl, CURLOPT_MUTE, 1);
+ // Set the URL.
+ curl_easy_setopt(curl, CURLOPT_URL, domainurl);
+ // POST the form.
+ curl_easy_setopt(curl, CURLOPT_HTTPPOST, form);
+ dprintf("> curl perform...\n");
#ifdef _XEN_XFR_STUB_
-#else
-#endif
+ dprintf("> _XEN_XFR_STUB_ defined - not calling xend\n");
+ curlcode = 0;
+#else
+ curlcode = curl_easy_perform(curl);
+#endif
+ exit:
+ if(curl) curl_easy_cleanup(curl);
+ if(form) curl_formfree(form);
+ if(formcode){
+ dprintf("> formcode=%d\n", formcode);
+ err = -EINVAL;
+ }
+ if(curlcode){
+ dprintf("> curlcode=%d\n", curlcode);
+ err = -EINVAL;
+ }
dprintf("< err=%d\n", err);
return err;
}
Sxpr oxfr_err; // (xfr.err <code>)
Sxpr oxfr_hello; // (xfr.hello <major> <minor>)
Sxpr oxfr_migrate; // (xfr.migrate <vmid> <vmconfig> <host> <port>)
-Sxpr oxfr_ok; // (xfr.ok <value>)
+Sxpr oxfr_migrate_ok;// (xfr.migrate.ok <value>)
Sxpr oxfr_progress; // (xfr.progress <percent> <rate: kb/s>)
Sxpr oxfr_save; // (xfr.save <vmid> <vmconfig> <file>)
-Sxpr oxfr_suspend; // (xfr.suspend <vmid>)
+Sxpr oxfr_save_ok; // (xfr.save.ok)
+Sxpr oxfr_vm_suspend;// (xfr.vm.suspend <vmid>)
Sxpr oxfr_xfr; // (xfr.xfr <vmid>)
+Sxpr oxfr_xfr_ok; // (xfr.xfr.ok <vmid>)
void xfr_init(void){
- oxfr_configure = intern("xfr.configure");
- oxfr_err = intern("xfr.err");
- oxfr_hello = intern("xfr.hello");
- oxfr_migrate = intern("xfr.migrate");
- oxfr_ok = intern("xfr.ok");
- oxfr_progress = intern("xfr.progress");
- oxfr_save = intern("xfr.save");
- oxfr_suspend = intern("xfr.suspend");
- oxfr_xfr = intern("xfr.xfr");
+ oxfr_configure = intern("xfr.configure");
+ oxfr_err = intern("xfr.err");
+ oxfr_hello = intern("xfr.hello");
+ oxfr_migrate = intern("xfr.migrate");
+ oxfr_migrate_ok = intern("xfr.migrate.ok");
+ oxfr_progress = intern("xfr.progress");
+ oxfr_save = intern("xfr.save");
+ oxfr_save_ok = intern("xfr.save.ok");
+ oxfr_vm_suspend = intern("xfr.vm.suspend");
+ oxfr_xfr = intern("xfr.xfr");
+ oxfr_xfr_ok = intern("xfr.xfr.ok");
}
#ifndef TRUE
return (err < 0 ? err : 0);
}
-int xfr_send_ok(Conn *conn, uint32_t vmid){
+int xfr_send_xfr_ok(Conn *conn, uint32_t vmid){
int err = 0;
err = IOStream_print(conn->out, "(%s %d)",
- atom_name(oxfr_ok), vmid);
+ atom_name(oxfr_xfr_ok), vmid);
+ return (err < 0 ? err : 0);
+}
+
+int xfr_send_migrate_ok(Conn *conn, uint32_t vmid){
+ int err = 0;
+
+ err = IOStream_print(conn->out, "(%s %d)",
+ atom_name(oxfr_migrate_ok), vmid);
+ return (err < 0 ? err : 0);
+}
+
+int xfr_send_save_ok(Conn *conn){
+ int err = 0;
+
+ err = IOStream_print(conn->out, "(%s)",
+ atom_name(oxfr_save_ok));
return (err < 0 ? err : 0);
}
int err = 0;
err = IOStream_print(conn->out, "(%s %d)",
- atom_name(oxfr_suspend), vmid);
+ atom_name(oxfr_vm_suspend), vmid);
return (err < 0 ? err : 0);
}
+/** Suspend a vm on behalf of save/migrate.
+ */
+int xfr_vm_suspend(Conn *xend, uint32_t vmid){
+ int err = 0;
+ err = xfr_send_suspend(xend, vmid);
+ if(err) goto exit;
+ IOStream_flush(xend->out);
+ err = xfr_response(xend);
+ exit:
+ return err;
+}
+
/** Get vm state. Send transfer message.
*
* @param peer connection
int errcode;
err = intof(sxpr_childN(sxpr, 0, ONONE), &errcode);
if(!err) err = errcode;
- } else if(sxpr_elementp(sxpr, oxfr_ok)){
+ } else if(sxpr_elementp(sxpr, oxfr_xfr_ok)){
// Ok - get the new domain id.
err = intof(sxpr_childN(sxpr, 0, ONONE), &state->vmid_new);
xfr_error(peer, err);
err = xfr_error(xend, first_err);
} else {
// Report new domain id to xend.
- err = xfr_send_ok(xend, state->vmid_new);
+ err = xfr_send_migrate_ok(xend, state->vmid_new);
}
XfrState_set_err(state, err);
goto exit;
}
err = xen_domain_snd(xend, io, state->vmid, state->vmconfig, state->vmconfig_n);
+ if(err){
+ err = xfr_error(xend, err);
+ } else {
+ err = xfr_send_save_ok(xend);
+ }
exit:
if(io){
IOStream_close(io);
if(err) goto exit;
// Report new domain id to peer.
- err = xfr_send_ok(peer, state->vmid_new);
+ err = xfr_send_xfr_ok(peer, state->vmid_new);
if(err) goto exit;
// Get the final ok.
err = xfr_response(peer);
#define _XFRD_XFRD_H_
/** Xend port in host order. */
-#define XEND_PORT 8001
+#define XEND_PORT 8000
/** Xfrd port in host order. */
#define XFRD_PORT 8002
#define XFR_PROTO_MAJOR 1
#define XFR_PROTO_MINOR 0
+struct Conn;
+extern int xfr_vm_suspend(struct Conn *xend, uint32_t vmid);
#endif